In [1]:
import tensorflow as tf
import pandas as pd
import numpy as np
import shutil
import math
import multiprocessing
from datetime import datetime
from tensorflow.python.feature_column import feature_column
print(tf.__version__)


/Users/khalidsalama/anaconda/lib/python3.6/importlib/_bootstrap.py:205: RuntimeWarning: compiletime version 3.5 of module 'tensorflow.python.framework.fast_tensor_util' does not match runtime version 3.6
  return f(*args, **kwds)
1.4.0

Steps to use the TF Estimator APIs

  1. Define dataset metadata
  2. Define data input function to read the data from Pandas dataframe + apply feature processing
  3. Create TF feature columns based on metadata + extended feature columns
  4. Instantiate an estimator with the required feature columns & parameters
  5. Train estimator using training data
  6. Evaluate estimator using test data
  7. Perform predictions
  8. Save & Serve the estimator

In [2]:
MODEL_NAME = 'reg-model-01'

TRAIN_DATA_FILE = 'data/train-data.csv'
VALID_DATA_FILE = 'data/valid-data.csv'
TEST_DATA_FILE = 'data/test-data.csv'

RESUME_TRAINING = False
PROCESS_FEATURES = True
MULTI_THREADING = False

1. Define Dataset Metadata

  • CSV file header and defaults
  • Numeric and categorical feature names
  • Target feature name
  • Unused columns

In [3]:
HEADER = ['key','x','y','alpha','beta','target']
HEADER_DEFAULTS = [[0], [0.0], [0.0], ['NA'], ['NA'], [0.0]]

NUMERIC_FEATURE_NAMES = ['x', 'y']  

CATEGORICAL_FEATURE_NAMES_WITH_VOCABULARY = {'alpha':['ax01', 'ax02'], 'beta':['bx01', 'bx02']}
CATEGORICAL_FEATURE_NAMES = list(CATEGORICAL_FEATURE_NAMES_WITH_VOCABULARY.keys())

FEATURE_NAMES = NUMERIC_FEATURE_NAMES + CATEGORICAL_FEATURE_NAMES

TARGET_NAME = 'target'

UNUSED_FEATURE_NAMES = list(set(HEADER) - set(FEATURE_NAMES) - {TARGET_NAME})

print("Header: {}".format(HEADER))
print("Numeric Features: {}".format(NUMERIC_FEATURE_NAMES))
print("Categorical Features: {}".format(CATEGORICAL_FEATURE_NAMES))
print("Target: {}".format(TARGET_NAME))
print("Unused Features: {}".format(UNUSED_FEATURE_NAMES))


Header: ['key', 'x', 'y', 'alpha', 'beta', 'target']
Numeric Features: ['x', 'y']
Categorical Features: ['alpha', 'beta']
Target: target
Unused Features: ['key']

2. Define Data Input Function

  • Input csv file name
  • Load pandas Dataframe
  • Apply feature processing
  • Return a function that returns (features, target) tensors

In [4]:
def process_dataframe(dataset_df):
    
    dataset_df["x_2"] = np.square(dataset_df['x'])
    dataset_df["y_2"] = np.square(dataset_df['y'])
    dataset_df["xy"] = dataset_df['x'] * dataset_df['y']
    dataset_df['dist_xy'] =  np.sqrt(np.square(dataset_df['x']-dataset_df['y']))
    
    return dataset_df

def generate_pandas_input_fn(file_name, mode=tf.estimator.ModeKeys.EVAL,
                             skip_header_lines=0,
                             num_epochs=1,
                             batch_size=100):

    df_dataset = pd.read_csv(file_name, names=HEADER, skiprows=skip_header_lines)
    
    x = df_dataset[FEATURE_NAMES].copy()
    if PROCESS_FEATURES:
        x = process_dataframe(x)
    
    y = df_dataset[TARGET_NAME]
        
    shuffle = True if mode == tf.estimator.ModeKeys.TRAIN else False
    
    num_threads=1
    
    if MULTI_THREADING:
        num_threads=multiprocessing.cpu_count()
        num_epochs = int(num_epochs/num_threads) if mode == tf.estimator.ModeKeys.TRAIN else num_epochs
    
    pandas_input_fn = tf.estimator.inputs.pandas_input_fn(
        batch_size=batch_size,
        num_epochs= num_epochs,
        shuffle=shuffle,
        x=x,
        y=y,
        target_column=TARGET_NAME
    )
    
    print("")
    print("* data input_fn:")
    print("================")
    print("Input file: {}".format(file_name))
    print("Dataset size: {}".format(len(df_dataset)))
    print("Batch size: {}".format(batch_size))
    print("Epoch Count: {}".format(num_epochs))
    print("Mode: {}".format(mode))
    print("Thread Count: {}".format(num_threads))
    print("Shuffle: {}".format(shuffle))
    print("================")
    print("")
    
    return pandas_input_fn

In [5]:
features, target = generate_pandas_input_fn(file_name=TRAIN_DATA_FILE)()
print("Feature read from DataFrame: {}".format(list(features.keys())))
print("Target read from DataFrame: {}".format(target))


* data input_fn:
================
Input file: data/train-data.csv
Dataset size: 12000
Batch size: 100
Epoch Count: 1
Mode: eval
Thread Count: 1
Shuffle: False
================

Feature read from DataFrame: ['x', 'y', 'alpha', 'beta', 'x_2', 'y_2', 'xy', 'dist_xy']
Target read from DataFrame: Tensor("fifo_queue_DequeueUpTo:9", shape=(?,), dtype=float64)

3. Define Feature Columns

The input numeric columns are assumed to be normalized (or have the same scale). Otherwise, a normlizer_fn, along with the normlisation params (mean, stdv or min, max) should be passed to tf.feature_column.numeric_column() constructor.


In [6]:
def get_feature_columns():
    
    
    all_numeric_feature_names = NUMERIC_FEATURE_NAMES
    
    CONSTRUCTED_NUMERIC_FEATURES_NAMES = ['x_2', 'y_2', 'xy', 'dist_xy']
    
    if PROCESS_FEATURES:
        all_numeric_feature_names += CONSTRUCTED_NUMERIC_FEATURES_NAMES

    numeric_columns = {feature_name: tf.feature_column.numeric_column(feature_name)
                       for feature_name in all_numeric_feature_names}

    categorical_column_with_vocabulary = \
        {item[0]: tf.feature_column.categorical_column_with_vocabulary_list(item[0], item[1])
         for item in CATEGORICAL_FEATURE_NAMES_WITH_VOCABULARY.items()}
        
    feature_columns = {}

    if numeric_columns is not None:
        feature_columns.update(numeric_columns)

    if categorical_column_with_vocabulary is not None:
        feature_columns.update(categorical_column_with_vocabulary)
        
    # add extended features (crossing, bucektization, embedding)
    
    feature_columns['alpha_X_beta'] = tf.feature_column.crossed_column(
        [feature_columns['alpha'], feature_columns['beta']], 4)
    
    return feature_columns

feature_columns = get_feature_columns()
print("Feature Columns: {}".format(feature_columns))


Feature Columns: {'x': _NumericColumn(key='x', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None), 'y': _NumericColumn(key='y', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None), 'x_2': _NumericColumn(key='x_2', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None), 'y_2': _NumericColumn(key='y_2', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None), 'xy': _NumericColumn(key='xy', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None), 'dist_xy': _NumericColumn(key='dist_xy', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None), 'alpha': _VocabularyListCategoricalColumn(key='alpha', vocabulary_list=('ax01', 'ax02'), dtype=tf.string, default_value=-1, num_oov_buckets=0), 'beta': _VocabularyListCategoricalColumn(key='beta', vocabulary_list=('bx01', 'bx02'), dtype=tf.string, default_value=-1, num_oov_buckets=0), 'alpha_X_beta': _CrossedColumn(keys=(_VocabularyListCategoricalColumn(key='alpha', vocabulary_list=('ax01', 'ax02'), dtype=tf.string, default_value=-1, num_oov_buckets=0), _VocabularyListCategoricalColumn(key='beta', vocabulary_list=('bx01', 'bx02'), dtype=tf.string, default_value=-1, num_oov_buckets=0)), hash_bucket_size=4, hash_key=None)}

4. Create an Estimator

a. Define an Estimator Creation Function

  • Get dense (numeric) columns from the feature columns
  • Convert categorical columns to indicator columns
  • Create Instantiate a DNNRegressor estimator given dense + indicator feature columns + params

In [7]:
def create_estimator(run_config, hparams):
    
    feature_columns = list(get_feature_columns().values())
    
    dense_columns = list(
        filter(lambda column: isinstance(column, feature_column._NumericColumn),
               feature_columns
        )
    )

    categorical_columns = list(
        filter(lambda column: isinstance(column, feature_column._VocabularyListCategoricalColumn) |
                              isinstance(column, feature_column._BucketizedColumn),
                   feature_columns)
    )

    indicator_columns = list(
            map(lambda column: tf.feature_column.indicator_column(column),
                categorical_columns)
    )
    
    
    estimator_feature_columns = dense_columns + indicator_columns 
    
    estimator = tf.estimator.DNNRegressor(
        
        feature_columns= estimator_feature_columns,
        hidden_units= hparams.hidden_units,
        
        optimizer= tf.train.AdamOptimizer(),
        activation_fn= tf.nn.elu,
        dropout= hparams.dropout_prob,
        
        config= run_config
    )
    
    print("")
    print("Estimator Type: {}".format(type(estimator)))
    print("")
    
    return estimator

b. Set hyper-parameter values (HParams)


In [8]:
hparams  = tf.contrib.training.HParams(
    num_epochs = 100,
    batch_size = 500,
    hidden_units=[8, 4], 
    dropout_prob = 0.0)


model_dir = 'trained_models/{}'.format(MODEL_NAME)

run_config = tf.estimator.RunConfig().replace(model_dir=model_dir)
print("Model directory: {}".format(run_config.model_dir))
print("Hyper-parameters: {}".format(hparams))


Model directory: trained_models/reg-model-01
Hyper-parameters: [('batch_size', 500), ('dropout_prob', 0.0), ('hidden_units', [8, 4]), ('num_epochs', 100)]

c. Instantiate the estimator


In [9]:
estimator = create_estimator(run_config, hparams)


INFO:tensorflow:Using config: {'_model_dir': 'trained_models/reg-model-01', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': None, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x1280145f8>, '_task_type': 'worker', '_task_id': 0, '_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}

Estimator Type: <class 'tensorflow.python.estimator.canned.dnn.DNNRegressor'>

5. Train the Estimator


In [10]:
train_input_fn = generate_pandas_input_fn(file_name= TRAIN_DATA_FILE, 
                                      mode=tf.estimator.ModeKeys.TRAIN,
                                      num_epochs=hparams.num_epochs,
                                      batch_size=hparams.batch_size) 

if not RESUME_TRAINING:
    shutil.rmtree(model_dir, ignore_errors=True)
    
tf.logging.set_verbosity(tf.logging.INFO)

time_start = datetime.utcnow() 
print("Estimator training started at {}".format(time_start.strftime("%H:%M:%S")))
print(".......................................")

estimator.train(input_fn = train_input_fn)

time_end = datetime.utcnow() 
print(".......................................")
print("Estimator training finished at {}".format(time_end.strftime("%H:%M:%S")))
print("")
time_elapsed = time_end - time_start
print("Estimator training elapsed time: {} seconds".format(time_elapsed.total_seconds()))


* data input_fn:
================
Input file: data/train-data.csv
Dataset size: 12000
Batch size: 500
Epoch Count: 100
Mode: train
Thread Count: 1
Shuffle: True
================

Estimator training started at 19:19:12
.......................................
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Saving checkpoints for 1 into trained_models/reg-model-01/model.ckpt.
INFO:tensorflow:loss = 179225.0, step = 1
INFO:tensorflow:global_step/sec: 166.515
INFO:tensorflow:loss = 124778.0, step = 101 (0.602 sec)
INFO:tensorflow:global_step/sec: 182.042
INFO:tensorflow:loss = 144432.0, step = 201 (0.550 sec)
INFO:tensorflow:global_step/sec: 221.401
INFO:tensorflow:loss = 167542.0, step = 301 (0.451 sec)
INFO:tensorflow:global_step/sec: 208.414
INFO:tensorflow:loss = 146349.0, step = 401 (0.480 sec)
INFO:tensorflow:global_step/sec: 216.184
INFO:tensorflow:loss = 148680.0, step = 501 (0.462 sec)
INFO:tensorflow:global_step/sec: 217.155
INFO:tensorflow:loss = 123907.0, step = 601 (0.460 sec)
INFO:tensorflow:global_step/sec: 209.701
INFO:tensorflow:loss = 113046.0, step = 701 (0.477 sec)
INFO:tensorflow:global_step/sec: 168.637
INFO:tensorflow:loss = 107878.0, step = 801 (0.594 sec)
INFO:tensorflow:global_step/sec: 126.787
INFO:tensorflow:loss = 118305.0, step = 901 (0.788 sec)
INFO:tensorflow:global_step/sec: 138.261
INFO:tensorflow:loss = 101507.0, step = 1001 (0.723 sec)
INFO:tensorflow:global_step/sec: 162.629
INFO:tensorflow:loss = 106166.0, step = 1101 (0.616 sec)
INFO:tensorflow:global_step/sec: 210.706
INFO:tensorflow:loss = 107934.0, step = 1201 (0.474 sec)
INFO:tensorflow:global_step/sec: 175.23
INFO:tensorflow:loss = 98094.9, step = 1301 (0.571 sec)
INFO:tensorflow:global_step/sec: 176.572
INFO:tensorflow:loss = 89144.2, step = 1401 (0.566 sec)
INFO:tensorflow:global_step/sec: 177.678
INFO:tensorflow:loss = 104465.0, step = 1501 (0.563 sec)
INFO:tensorflow:global_step/sec: 183.081
INFO:tensorflow:loss = 92220.2, step = 1601 (0.546 sec)
INFO:tensorflow:global_step/sec: 218.108
INFO:tensorflow:loss = 79086.9, step = 1701 (0.458 sec)
INFO:tensorflow:global_step/sec: 138.97
INFO:tensorflow:loss = 93577.3, step = 1801 (0.724 sec)
INFO:tensorflow:global_step/sec: 145.418
INFO:tensorflow:loss = 75269.3, step = 1901 (0.684 sec)
INFO:tensorflow:global_step/sec: 181.944
INFO:tensorflow:loss = 73518.7, step = 2001 (0.549 sec)
INFO:tensorflow:global_step/sec: 165.012
INFO:tensorflow:loss = 75916.3, step = 2101 (0.607 sec)
INFO:tensorflow:global_step/sec: 130.054
INFO:tensorflow:loss = 65138.1, step = 2201 (0.768 sec)
INFO:tensorflow:global_step/sec: 128.839
INFO:tensorflow:loss = 65868.5, step = 2301 (0.777 sec)
INFO:tensorflow:Saving checkpoints for 2400 into trained_models/reg-model-01/model.ckpt.
INFO:tensorflow:Loss for final step: 88071.1.
.......................................
Estimator training finished at 19:19:30

Estimator training elapsed time: 17.686301 seconds

6. Evaluate the Model


In [11]:
TEST_SIZE = 5000

test_input_fn = generate_pandas_input_fn(file_name=TEST_DATA_FILE, 
                                      mode= tf.estimator.ModeKeys.EVAL,
                                      batch_size= TEST_SIZE)

results = estimator.evaluate(input_fn=test_input_fn)
print("")
print(results)
rmse = round(math.sqrt(results["average_loss"]),5)
print("")
print("RMSE: {}".format(rmse))


* data input_fn:
================
Input file: data/test-data.csv
Dataset size: 5000
Batch size: 5000
Epoch Count: 1
Mode: eval
Thread Count: 1
Shuffle: False
================

INFO:tensorflow:Starting evaluation at 2017-11-14-19:19:30
INFO:tensorflow:Restoring parameters from trained_models/reg-model-01/model.ckpt-2400
INFO:tensorflow:Finished evaluation at 2017-11-14-19:19:31
INFO:tensorflow:Saving dict for global step 2400: average_loss = 164.862, global_step = 2400, loss = 824311.0

{'average_loss': 164.86218, 'loss': 824310.88, 'global_step': 2400}

RMSE: 12.83987

7. Prediction


In [12]:
import itertools

predict_input_fn = generate_pandas_input_fn(file_name=TEST_DATA_FILE, 
                                      mode= tf.estimator.ModeKeys.PREDICT,
                                      batch_size= 5)

predictions = estimator.predict(input_fn=predict_input_fn)
values = list(map(lambda item: item["predictions"][0],list(itertools.islice(predictions, 5))))
print()
print("Predicted Values: {}".format(values))


* data input_fn:
================
Input file: data/test-data.csv
Dataset size: 5000
Batch size: 5
Epoch Count: 1
Mode: infer
Thread Count: 1
Shuffle: False
================

INFO:tensorflow:Restoring parameters from trained_models/reg-model-01/model.ckpt-2400

Predicted Values: [13.141397, -5.9562521, 11.541443, 3.8178449, 2.1242597]

8. Save & Serve the Model

a. Define Seving Function


In [1]:
def process_features(features):
    
    features["x_2"] = tf.square(features['x'])
    features["y_2"] = tf.square(features['y'])
    features["xy"] = tf.multiply(features['x'], features['y'])
    features['dist_xy'] =  tf.sqrt(tf.squared_difference(features['x'],features['y']))
    
    return features

def csv_serving_input_fn():
    
    SERVING_HEADER = ['x','y','alpha','beta']
    SERVING_HEADER_DEFAULTS = [[0.0], [0.0], ['NA'], ['NA']]

    rows_string_tensor = tf.placeholder(dtype=tf.string,
                                         shape=[None],
                                         name='csv_rows')
    
    receiver_tensor = {'csv_rows': rows_string_tensor}

    row_columns = tf.expand_dims(rows_string_tensor, -1)
    columns = tf.decode_csv(row_columns, record_defaults=SERVING_HEADER_DEFAULTS)
    features = dict(zip(SERVING_HEADER, columns))
    
    if PROCESS_FEATURES:
        features = process_features(features)

    return tf.estimator.export.ServingInputReceiver(
        features, receiver_tensor)

b. Export SavedModel


In [31]:
export_dir = model_dir + "/export"

estimator.export_savedmodel(
    export_dir_base = export_dir,
    serving_input_receiver_fn = csv_serving_input_fn,
    as_text=True
)


INFO:tensorflow:Restoring parameters from trained_models/reg-model-01/model.ckpt-2400
INFO:tensorflow:Assets added to graph.
INFO:tensorflow:No assets to write.
INFO:tensorflow:SavedModel written to: b"trained_models/reg-model-01/export/temp-b'1510688109'/saved_model.pbtxt"
Out[31]:
b'trained_models/reg-model-01/export/1510688109'

c. Serve the Saved Model


In [35]:
import os

saved_model_dir = export_dir + "/" + os.listdir(path=export_dir)[-1] 

print(saved_model_dir)

predictor_fn = tf.contrib.predictor.from_saved_model(
    export_dir = saved_model_dir,
    signature_def_key="predict"
)

output = predictor_fn({'csv_rows': ["0.5,1,ax01,bx02", "-0.5,-1,ax02,bx02"]})
print(output)


trained_models/reg-model-01/export/1510688109
INFO:tensorflow:Restoring parameters from b'trained_models/reg-model-01/export/1510688109/variables/variables'
{'predictions': array([[ 13.15929985],
       [-13.96904373]], dtype=float32)}

What can we improve?

  • Use data files instead of DataFrames - pandas dataframes need to fit in memory, and hard to distribute. Working with (sharded) training data files allows reading records in batches (so we can work with large data set regardless the memory size), as well as supporting distributed training (data parallelism).
  • Use Experiment APIs - Experiment API knows how to invoke training and eval loops in a sensible fashion for local & distributed training.
  • Early Stopping - Use the validation set evaluation to stop the training and avoid overfitting.

In [ ]: